package com.gitee.hermer.boot.jee.io.activemq;

/**
 * @author tumc
 * 主要对activeMQ简单的封装
 */

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.gitee.hermer.boot.jee.commons.log.UtilsContext;

public class ActiveMQExecute extends UtilsContext{
    
	@Autowired
	private PooledConnectionFactory connectionFactory = null;
	private Connection connectionQueue = null;
	private Connection connectionTopic = null;

	private Session sessionQueue = null;
	private Session sessionTopic = null;
	private Destination destination = null;
	private MessageProducer messageProducer = null;
	private MessageConsumer messageConsumer = null;
	
	
	

	
	public Session getSessionQueue() {
		return sessionQueue;
	}

	public Session getSessionTopic() {
		return sessionTopic;
	}

	public  ActiveMQExecute(){
	}

	/**
	 * 消息中间件发送消息方法
	 * @param queueName 队列名称
	 * @param serializable 发送消息序列化内容
	 * @throws JMSException 
	 */
	public void sendQueueMsg(String queueName, Serializable serializable) throws JMSException{
		Connection connection = connectionFactory.createConnection();
		try {
			Session session = null;
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue(queueName);
			messageProducer = session.createProducer(destination);
			sendMessage(session, messageProducer, serializable);
			session.commit();
		} catch (Exception e) {
			error(e.getMessage(),e);
		} finally{
			closeConnection(connection);
		}

	}




	/**
	 * 订阅模式发送消息方法封装
	 * @param queueName 订阅名称
	 * @param serializable 发送订阅序列化消息
	 * @throws JMSException 
	 */
	public void sendTopicMsg(String topicName, Serializable serializable) throws JMSException{
		Connection connection = connectionFactory.createConnection();
		try {
			Session session = null;
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = session.createTopic(topicName);
			messageProducer = session.createProducer(destination);
			sendMessage(session, messageProducer, serializable);
			session.commit();
		} catch (JMSException e) {
			error(e.getMessage(),e);
		} finally{
			closeConnection(connection);
		}

	}


	/**
	 * 发送序列化的数据
	 * @param session session
	 * @param producer 消息生产者
	 * @param serializable 发送消息序列化内容
	 */
	private void sendMessage(Session session, MessageProducer producer, Serializable serializable){
		try {
			ObjectMessage objectMessage = session.createObjectMessage(serializable);
			producer.send(objectMessage);
		} catch (JMSException e) {
			error(e.getMessage(),e);
		}

	}

	public void topicMessageListener(String topicName,MessageListener messageListener,ExceptionListener exceptionListener) throws JMSException{
		if(connectionTopic == null)
			connectionTopic = connectionFactory.createConnection();
		try {
			connectionTopic.start();
			connectionTopic.setExceptionListener(exceptionListener);
			sessionTopic = connectionTopic.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			destination = sessionTopic.createTopic(topicName);
			messageConsumer = sessionTopic.createConsumer(destination);
			messageConsumer.setMessageListener(messageListener);
		} catch (JMSException e) {
			error(e.getMessage(),e);
		}
	}
	/**
	 * 消息中间件接收消息方法
	 * @param queueName 队列名称
	 * @throws JMSException 
	 */
	public void queueMessageListener(String queueName,MessageListener messageListener,ExceptionListener exceptionListener) throws JMSException{
		if(connectionQueue == null)
			connectionQueue = connectionFactory.createConnection();
		try {
			connectionQueue.start();
			connectionQueue.setExceptionListener(exceptionListener);
			sessionQueue = connectionQueue.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			destination = sessionQueue.createQueue(queueName);
			messageConsumer = sessionQueue.createConsumer(destination);
			messageConsumer.setMessageListener(messageListener);
		} catch (JMSException e) {
			error(e.getMessage(),e);
		}
	}
	/**
	 * 关闭掉消费者长连接
	 */
	public void closeConnectionListener(){
		if(connectionQueue != null)
			closeConnection(connectionQueue);
		if (connectionTopic != null) 
			closeConnection(connectionTopic);
		connectionQueue = null;
		connectionTopic = null;
	}

	/**
	 * 关闭链接
	 * @param connection 链接
	 */
	private void closeConnection(Connection connection){
		if(connection != null ){
			try {
				connection.close();
			} catch (JMSException e) {
				error(e.getMessage(),e);
			}
		}
	}

}